In this project, we implement a streaming graph processing algorithm from the paper Stefani, L. D., Epasto, A., Riondato, M., & Upfal, E. (2017). Triest: Counting local and global triangles in fully dynamic streams with fixed memory size. ACM Transactions on Knowledge Discovery from Data (TKDD), 11(4), 1-50 (link:http://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf). First the reservoir sampling algorithm described in the paper is implemented. Then, the streaming graph algorithms, Trièst-Base and Trièst-Base improved, from the paper that make use of the algorithm implemented in the previous step.
In order to ensure that our implementation is correct, we test out implementation with the following dataset:
Arxiv GR-QC (General Relativity and Quantum Cosmology) collaboration network is from the e-print arXiv and covers scientific collaborations between authors papers submitted to General Relativity and Quantum Cosmology category. If an author i co-authored a paper with author j, the graph contains a undirected edge from i to j. If the paper is co-authored by k authors this generates a completely connected (sub)graph on k nodes. Link: https://snap.stanford.edu/data/ca-GrQc.html
The data covers papers in the period from January 1993 to April 2003 (124 months). It begins within a few months of the inception of the arXiv, and thus represents essentially the complete history of its GR-QC section.
| Dataset statistics | |
|---|---|
| Nodes | 5242 |
| Edges | 14496 |
| Nodes in largest WCC | 4158 (0.793) |
| Edges in largest WCC | 13428 (0.926) |
| Nodes in largest SCC | 4158 (0.793) |
| Edges in largest SCC | 13428 (0.926) |
| Average clustering coefficient | 0.5296 |
| Number of triangles | 48260 |
| Fraction of closed triangles | 0.3619 |
| Diameter (longest shortest path) | 17 |
| 90-percentile effective diameter | 7.6 |
# Try on smaller dataset
import pandas as pd
df = pd.read_csv('CA-GrQc.txt', sep="\t", header=3)
edges_no_selfloops = [frozenset(x) for x in df.to_numpy() if x[0]!=x[1]] # self-loops filtered out
df_edges = list(set(edges_no_selfloops)) # directed changed to undirected
import random
class triest:
def __init__(self, M):
""" Initialization
M (int): max edges in edge sample S (at least 6)
t (int): time t in the stream t>=0
tau (int): global triangle counter
localCounter (set): local counter for subset nodes u in V^t
S (set): edge sample up to M edges from stream
"""
self.M = M
self.t = 0
self.tau = 0
self.localCounter = {}
self.S = set()
def reservoir(self,e):
""" Standard reservoir sampling
e (frozenset): one undirected edge {a,b} from node a to node b
Returns
bool: False if edge sample S is not modified, True otherwise
"""
if self.t <= self.M:
return True;
elif random.random() <= self.M / self.t:
removeEdge = random.sample(self.S, 1)[0]
self.S.remove(removeEdge)
self.updateCounters(0,removeEdge)
return True
return False
def updateCounters(self,operation, newEdge):
""" Increment/decrement counters after each insertion/removal
operation (int): 1 if insertion (+), 0 if removal (-)
newEdge (list of one frozenset): edge to insert/remove
Returns
Updated counters
"""
Nu = set() # Neighbourhood of u in G^s
Nv = set() # Neighbourhood of v in G^s
l = list(newEdge) #to ensure order
for x in self.S: #for each set in S
if l[0] in x: #first element is u and second v
temp = set(x) #remove u from xES to get its neighbor
temp.remove(l[0]) #cannot remove in frozenset
Nu.add(temp.pop()) #add the one element there is in temp
elif l[1] in x: #no duplicates no loops, (not adding something to triangles)
temp = set(x)
temp.remove(l[1])
Nv.add(temp.pop())
Nuv = Nu.intersection(Nv) # intersection of neighbourhoods u and v in G^s
for c in Nuv:
if operation == 1:
self.tau +=1
self.localCounter[c] = self.localCounter.get(c,0) + 1 # by one rest by |Nuv|
self.localCounter[l[0]] = self.localCounter.get(l[0],0) + 1
self.localCounter[l[1]] = self.localCounter.get(l[1],0) + 1
elif operation ==0:
if self.tau>0:
self.tau-=1
self.localCounter[c] = self.localCounter.get(c) - 1 # by one rest by |Nuv|
self.localCounter[l[0]] = self.localCounter.get(l[0],0) - 1
#print("deletion",l[0],self.localCounter)
self.localCounter[l[1]] = self.localCounter.get(l[1],0) - 1
#print("deletion",l[1],self.localCounter)
if self.localCounter[l[0]] <= 0:
del self.localCounter[l[0]]
if self.localCounter[l[1]] <= 0:
del self.localCounter[l[1]]
if self.localCounter[c] <= 0:
del self.localCounter[c]
def call(self, stream):
""" Triest-base algo
stream (list of frozensets): edge stream
Returns dictionary with:
tau (int): global counter with key globalcount
localCounter (set): local counter with key localcount
estimation (int): estimation global triangle count with key estimation
"""
#time_count = 0 # to check progress
for ed in stream:
#time_count +=1 # to check progress
#print(f"{time_count}/{len(stream)}...") # to check progress
self.t+=1
if self.reservoir(ed): # if True, i.e. S will be modified
self.updateCounters(1,ed) # update counters
self.S.add(ed) # add edge to edge sample S
epsilon = max(1, (self.t*(self.t-1)*(self.t*-2))/(self.M*(self.M-1)*(self.M*-2)))
estimation = epsilon*self.tau # estimation global triangle count
return {"globalcount": self.tau, "localcount":self.localCounter, "estimation":estimation}
# Test set with 5 nodes, 8 edges, 5 triangles
# 1-4-5, 1-4-2, 1-2-3, 1-2-5, 2-4-5
edges = [frozenset({1,2}),frozenset({1,3}),frozenset({1,5}),frozenset({1,4}),
frozenset({2,3}),frozenset({2,5}),frozenset({2,4}),frozenset({4,5})]
result = triest(len(edges)).call(edges)
result
{'globalcount': 5,
'localcount': {1: 4, 2: 4, 3: 1, 5: 3, 4: 3},
'estimation': 5}
globalcounts = []
estimations = []
times = []
M = len(df_edges)
import time
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 56.50678491592407 seconds --- 48260 48260
M = 14000
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 59.25832557678223 seconds --- 43388 48045.43276012989
M = 10000
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 65.48468589782715 seconds --- 15880 48253.57476636687
M = 5000
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 41.97685766220093 seconds --- 2067 50251.94712906114
M = 1000
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 3.1309874057769775 seconds --- 15 45620.63582054054
M = 500
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 0.9302718639373779 seconds --- 4 97421.54214502605
M = 100
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 0.07601237297058105 seconds --- 0 0.0
M = 50
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 0.03733205795288086 seconds --- 0 0.0
M = 10
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 0.020941495895385742 seconds --- 0 0.0
M = 6
start_time = time.time()
result = triest(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts.append(result["globalcount"])
estimations.append(result["estimation"])
times.append(round((time.time() - start_time),5))
--- 0.021905183792114258 seconds --- 0 0.0
import random
class triestImpr:
def __init__(self, M):
""" Initialization
M (int): max edges in edge sample S (at least 6)
t (int): time t in the stream t>=0
tau (int): global triangle counter
localCounter (set): local counter for subset nodes u in V^t
S (set): edge sample up to M edges from stream
"""
self.M = M
self.t = 0
self.tau = 0
self.localCounter = {}
self.S = set()
def reservoir(self,e):
""" Standard reservoir sampling
e (frozenset): one undirected edge {a,b} from node a to node b
Returns
bool: False if edge sample S is not modified, True otherwise
"""
if self.t <= self.M:
return True;
elif random.random() <= self.M / self.t:
removeEdge = random.sample(self.S, 1)[0]
self.S.remove(removeEdge)
# IMPROVEMENT 2: updateCounters function removed
return True
return False
def updateCounters(self, newEdge): # IMPROVEMENT 3: removed operation
""" Increment/decrement counters after each insertion/removal
newEdge (list of one frozenset): edge to insert/remove
Returns
Updated counters
"""
Nu = set()
Nv = set()
l = list(newEdge)
for x in self.S:
if l[0] in x:
temp = set(x)
temp.remove(l[0])
Nu.add(temp.pop())
elif l[1] in x:
temp = set(x)
temp.remove(l[1])
Nv.add(temp.pop())
Nuv = Nu.intersection(Nv)
# IMPROVEMENT 3: replace 1 with weighted increase of counters
weight = max(1,((self.t - 1)*(self.t - 2))/(self.M*(self.M - 1)))
for c in Nuv:
self.tau +=weight
self.localCounter[c] = self.localCounter.get(c,0) + weight
self.localCounter[l[0]] = self.localCounter.get(l[0],0) + weight
self.localCounter[l[1]] = self.localCounter.get(l[1],0) + weight
def call(self, stream):
""" Triest-base algo
stream (list of frozensets): edge stream
Returns dictionary with:
tau (int): global counter with key globalcount
localCounter (set): local counter with key localcount
estimation (int): estimation global triangle count with key estimation
"""
#time_count = 0 # to check progress
for ed in stream:
#time_count +=1 # to check progress
#print(f"{time_count}/{len(stream)}...") # to check progress
self.t+=1
self.updateCounters(ed) # IMPROVEMENT 1: Moved to before if-block
if self.reservoir(ed):
self.S.add(ed)
epsilon = max(1, (self.t*(self.t-1)*(self.t*-2))/(self.M*(self.M-1)*(self.M*-2)))
estimation = epsilon*self.tau # estimation global triangle count
return {"globalcount": self.tau, "localcount":self.localCounter, "estimation":estimation}
result = triestImpr(len(edges)).call(edges)
result
{'globalcount': 5,
'localcount': {1: 4, 2: 4, 3: 1, 5: 3, 4: 3},
'estimation': 5}
globalcounts_impr = []
estimations_impr = []
times_impr = []
M = len(df_edges)
import time
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 58.49200701713562 seconds --- 48260 48260
M = 14000
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 54.225528717041016 seconds --- 48305.44766198478 53490.73793656917
M = 10000
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 48.44452238082886 seconds --- 47946.00241794199 145690.55493844906
M = 5000
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 32.40800499916077 seconds --- 46676.071291138316 1134767.0375977606
M = 1000
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 6.876000642776489 seconds --- 46698.71081081082 142028325.27924934
M = 500
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 3.4985780715942383 seconds --- 46253.5224448898 1126522371.555185
M = 100
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 0.7228474617004395 seconds --- 46911.55797979798 143972725068.56335
M = 50
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 0.41486120223999023 seconds --- 0 0.0
M = 10
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 0.13946127891540527 seconds --- 0 0.0
M = 6
start_time = time.time()
result = triestImpr(M).call(df_edges)
print("--- %s seconds ---" % (time.time() - start_time))
print(result["globalcount"], result["estimation"])
globalcounts_impr.append(result["globalcount"])
estimations_impr.append(result["estimation"])
times_impr.append(round((time.time() - start_time),5))
--- 0.10987019538879395 seconds --- 0 0.0
algotype = ["triest-base"]*len(times)
Ms = [len(df_edges),14000,10000,5000,1000,500,100,50,10,6]
triest_base = pd.DataFrame(
{'times': times,
'estimations': estimations,
'globalcounts': globalcounts,
'type': algotype,
'M': Ms
})
algotype = ["triest_impr"]*len(times_impr)
triest_impr = pd.DataFrame(
{'times': times_impr,
'estimations': estimations_impr,
'globalcounts': globalcounts_impr,
'type': algotype,
'M': Ms
})
df_results = triest_base.append(triest_impr)
df_results
| times | estimations | globalcounts | type | M | |
|---|---|---|---|---|---|
| 0 | 56.50778 | 4.826000e+04 | 48260.000000 | triest-base | 14484 |
| 1 | 59.25833 | 4.804543e+04 | 43388.000000 | triest-base | 14000 |
| 2 | 65.48469 | 4.825357e+04 | 15880.000000 | triest-base | 10000 |
| 3 | 41.97686 | 5.025195e+04 | 2067.000000 | triest-base | 5000 |
| 4 | 3.13199 | 4.562064e+04 | 15.000000 | triest-base | 1000 |
| 5 | 0.93127 | 9.742154e+04 | 4.000000 | triest-base | 500 |
| 6 | 0.07701 | 0.000000e+00 | 0.000000 | triest-base | 100 |
| 7 | 0.03829 | 0.000000e+00 | 0.000000 | triest-base | 50 |
| 8 | 0.02094 | 0.000000e+00 | 0.000000 | triest-base | 10 |
| 9 | 0.02290 | 0.000000e+00 | 0.000000 | triest-base | 6 |
| 0 | 58.49300 | 4.826000e+04 | 48260.000000 | triest_impr | 14484 |
| 1 | 54.22596 | 5.349074e+04 | 48305.447662 | triest_impr | 14000 |
| 2 | 48.44452 | 1.456906e+05 | 47946.002418 | triest_impr | 10000 |
| 3 | 32.40900 | 1.134767e+06 | 46676.071291 | triest_impr | 5000 |
| 4 | 6.87600 | 1.420283e+08 | 46698.710811 | triest_impr | 1000 |
| 5 | 3.49858 | 1.126522e+09 | 46253.522445 | triest_impr | 500 |
| 6 | 0.72364 | 1.439727e+11 | 46911.557980 | triest_impr | 100 |
| 7 | 0.41486 | 0.000000e+00 | 0.000000 | triest_impr | 50 |
| 8 | 0.14047 | 0.000000e+00 | 0.000000 | triest_impr | 10 |
| 9 | 0.11083 | 0.000000e+00 | 0.000000 | triest_impr | 6 |
import plotly.express as px
fig = px.scatter(df_results, x="M", y="globalcounts", color="type",
title="Global Triangle Count for Triest-Base and Improved Version Based on M")
fig.add_hline(y = 48260)
fig.show()
import plotly.express as px
fig = px.scatter(df_results, x="M", y="estimations", color="type",
title="Triangle Count Estimation for Triest-Base and Improved Version Based on M")
fig.add_hline(y = 48260)
fig.show()
fig = px.line(df_results, x="M", y="times", color="type",
title="Runtime for Triest-Base and Improved Version Based on M")
fig.show()
What were the challenges you have faced when implementing the algorithm?
In triest, there are a lot of replacements in the sampling dataset at the beginning leading to the deletion of several local counters. Additionally to that, the fact that late edges are more likely to be ignored leads to having no local counters when the size of the sampling dataset is not relatively big. This can be resolved by increasing the sample size or with triest-impr. In general triest is dataset sensitive and benefits edges who are passed close to sample size-time.
Can the algorithm be easily parallelized? If yes, how? If not, why? Explain.
It is possible to parallelize the algorithm. This can be done by partitioning the set into subsets. Another option is to parallelize the counters using Spark since associative and commutative properties are applied to addition and substraction. The associative property states that you can re-group numbers and you will get the same answer and the commutative property states that you can move numbers around and still arrive at the same answer.
Does the algorithm work for unbounded graph streams? Explain.
The algorithm works with time $t$, where $t \geq 0$. More specifically, "when queried at the end of time $t$, trièst-base returns $ξ^{(t)} τ^{(t)}$ as the estimation for the global triangle count". So, when we have an unbounded graph stream, the algorithm can retun an estimation for the global triangle count i.e. we estimate the total global triangle counts so far in the stream.
Does the algorithm support edge deletions? If not, what modification would it need? Explain.
No, but the fully dynamic algorithm proposed in the same paper called Trièst-FD is able to: "It is based on random pairing, a sampling scheme that extends reservoir sampling and can handle deletions. The idea behind the RP scheme is that edge deletions seen on the stream will be compensated by future edge insertions".